-
Couldn't load subscription status.
- Fork 537
feat: provide delta writer option to flush buffer after every batch #3675
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: Sam Meyer-Reed <[email protected]>
Signed-off-by: Sam Meyer-Reed <[email protected]>
Signed-off-by: Sam Meyer-Reed <[email protected]>
Signed-off-by: Sam Meyer-Reed <[email protected]>
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3675 +/- ##
==========================================
+ Coverage 75.58% 75.64% +0.06%
==========================================
Files 146 146
Lines 45172 45342 +170
Branches 45172 45342 +170
==========================================
+ Hits 34141 34299 +158
- Misses 9210 9215 +5
- Partials 1821 1828 +7 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| // does not extract write_batch_size from WriterProperties | ||
| if let Some(write_batch_size) = writer_props.write_batch_size { | ||
| builder = builder.with_write_batch_size(write_batch_size); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feels weird to me, but I wasn't sure how much I wanted to change the write_deltalake function signature here since we provide this write_batch_size setting in the WriterProperties as opposed to a standalone parameter like target_file_size.
|
Oops also realized I had pyarrow in my local uv env so that failing test passed for me locally but not here |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has the side effect of creating a lot of small files, right?
I was thinking, couldn't we do the flushing more tied together to file-writing, so the flush arrow writer becomes aware of the amount of flushed data.
Typing this on a phone so hope it's clear:
Write_batch --> bytes --> flush to disk --> check if flushed data is gt file_size_limit, if so close multipart and start new multipart
The only thing is the last part can only be smaller if you close the multipart write, so you have to make sure the writer only flushes full parts, and somehow becomes aware to check if the last part was the final part.
Yes this does currently write many more small files. Ahhh ok I see, yeah that makes a lot of sense. Ok I'll work on that, thanks! |
|
@ion-elgreco Ok follow up question about this: Currently delta-rs/crates/core/src/operations/write/writer.rs Lines 433 to 444 in 2920177
writer.close(), but now I think we have to call writer.close() multiple times per file to get buffer data. Currently the way I'm handling this is I'm just using the metadata from the first flush for a file and accumulating row counts, but we're losing out on column statistics for the whole set of batches (just using the stats from the first batch) which doesn't seem great.
Another option I can think of is merging the stats from each flush and rebuilding the whole metadata at the end but that seems pretty intense for this. I'm also seeing this which seems potentially helpful: https://github.com/delta-io/delta-rs/blob/2920177ac5215e192e0182bed93c42c0b4a98b6f/crates/core/src/writer/stats.rs#L436C1-L489C2 Just wanted to run this past you and see if you had an opinion on the best course of action, thanks! |
|
I think we should avoid calling AsyncArrowWriter.close() before reaching the max file size. Reset_writer currently returns the old buffer and writet and creates new buffer and writer but that should perhaps change 🤔 |
# Description This was based on me taking a closer look at the `PartitionWriter` code, and realizing it could be made a bit more efficient by deferring the parquet writing to a background task and removing the async locking associated with `AsyncShareableBuffer`. Instead, it switches the underlying writer to use a `ParquetObjectWriter`, which internally uses a `BufWriter`, which keeps a certain capacity of bytes in-memory until reaching a threshold, where it starts a multipart write. Additionally, once a writer has enough bytes in-progress/about to be flushed, we finish its writing on a background tokio task instead of yielding while putting new writes on the new file. # Related Issue(s) Probably closes delta-io#3578, as it still uses a small buffer internally through `ParquetObjectWriter` but adaptively flushes via a multipart upload using `BufWriter` for streaming writes. I'm also hoping this solves the issue I faced in delta-io#3855, though I'll aim to test this in production. - closes delta-io#3675 (superseded) # Documentation [`ParquetObjectWriter`](https://docs.rs/parquet/56.2.0/parquet/arrow/async_writer/struct.ParquetObjectWriter.html) [`BufWriter`](https://docs.rs/object_store/0.12.4/object_store/buffered/struct.BufWriter.html) --------- Signed-off-by: Abhi Agarwal <[email protected]>
Description
This provides the option to the delta writer to flush the in-memory buffer to disk after each record batch as opposed to waiting for the targeted file size by passing the
flush_per_batchparameter. This prevents accumulating lots of memory in some cases.Note: I believe there is still some accumulated memory usage through things like transaction metadata.
Related Issue(s)
Documentation